/*
 * Copyright 2019-2020 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package example.spring.data.nosql.redis.stream.consumer;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.test.context.ActiveProfiles;

import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.springframework.data.redis.connection.stream.StreamOffset.fromStart;

/**
 * @author Christoph Strobl
 * @author Mark Paluch
 */
@ActiveProfiles(value = { "single" })
@SpringBootTest
public class SyncStreamApiTests {

    @Autowired
    StringRedisTemplate template;
    @Autowired
    StreamMessageListenerContainer<String, MapRecord<String, String, String>> messageListenerContainer;

    StreamOperations<String, String, String> streamOps;

    @BeforeEach
    public void setUp() {

        // clear all
        template.getConnectionFactory().getConnection().flushAll();

        streamOps = template.opsForStream();
    }

    @Test
    public void basics() {

        // XADD with fixed id
        RecordId fixedId1 = streamOps.add(SensorData.RECORD_1234_0);
        assertThat(fixedId1).isEqualTo(SensorData.RECORD_1234_0.getId());

        RecordId fixedId2 = streamOps.add(SensorData.RECORD_1234_1);
        assertThat(fixedId2).isEqualTo(SensorData.RECORD_1234_1.getId());

        // XLEN
        assertThat(streamOps.size(SensorData.KEY)).isEqualTo(2L);

        // XADD errors when timestamp is less then last inserted
        assertThatExceptionOfType(RedisSystemException.class).isThrownBy(() -> {
            streamOps.add(SensorData.create("1234", "19.8", "invalid").withId(RecordId.of("0-0")));
        }).withMessageContaining("ID specified");

        // XADD with autogenerated id
        RecordId autogeneratedId = streamOps.add(SensorData.create("1234", "19.8", null));

        assertThat(autogeneratedId.getValue()).endsWith("-0");
        assertThat(streamOps.size(SensorData.KEY)).isEqualTo(3L);

        // XREAD from start
        List<MapRecord<String, String, String>> fromStart = streamOps.read(fromStart(SensorData.KEY));
        assertThat(fromStart).hasSize(3)
                             .extracting(MapRecord::getId)
                             .containsExactly(fixedId1, fixedId2, autogeneratedId);

        // XREAD resume after
        List<MapRecord<String, String, String>> fromOffset =
            streamOps.read(StreamOffset.create(SensorData.KEY, ReadOffset.from(fixedId2)));
        assertThat(fromOffset).hasSize(1).extracting(MapRecord::getId).containsExactly(autogeneratedId);
    }

    @Test
    public void continuousRead() throws InterruptedException {

        // container autostart is disabled by default
        if (!messageListenerContainer.isRunning()) {
            messageListenerContainer.start();
        }

        CapturingStreamListener streamListener = CapturingStreamListener.create();

        // XREAD BLOCK
        messageListenerContainer.receive(fromStart(SensorData.KEY), streamListener);

        TimeUnit.MILLISECONDS.sleep(100);

        assertThat(streamListener.recordsReceived()).isEqualTo(0);

        streamOps.add(SensorData.RECORD_1234_0);
        streamOps.add(SensorData.RECORD_1234_1);

        assertThat(streamListener.take().getId()).isEqualTo(SensorData.RECORD_1234_0.getId());
        assertThat(streamListener.take().getId()).isEqualTo(SensorData.RECORD_1234_1.getId());
        assertThat(streamListener.recordsReceived()).isEqualTo(2);

        streamOps.add(SensorData.RECORD_1235_0);

        assertThat(streamListener.take().getId()).isEqualTo(SensorData.RECORD_1235_0.getId());
        assertThat(streamListener.recordsReceived()).isEqualTo(3);
    }

}
